/*
 * Copyright 2019 WeBank
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.webank.wedatasphere.linkis.instance.label.service.impl;

import com.webank.wedatasphere.linkis.common.ServiceInstance;
import com.webank.wedatasphere.linkis.common.utils.ClassUtils;
import com.webank.wedatasphere.linkis.common.utils.Utils;
import com.webank.wedatasphere.linkis.instance.label.async.AsyncConsumerQueue;
import com.webank.wedatasphere.linkis.instance.label.async.GenericAsyncConsumerQueue;
import com.webank.wedatasphere.linkis.instance.label.conf.InsLabelConf;
import com.webank.wedatasphere.linkis.instance.label.dao.InsLabelRelationDao;
import com.webank.wedatasphere.linkis.instance.label.dao.InstanceInfoDao;
import com.webank.wedatasphere.linkis.instance.label.dao.InstanceLabelDao;
import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabel;
import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabelValue;
import com.webank.wedatasphere.linkis.instance.label.entity.InstanceInfo;
import com.webank.wedatasphere.linkis.instance.label.service.InsLabelAccessService;
import com.webank.wedatasphere.linkis.instance.label.service.annotation.AdapterMode;
import com.webank.wedatasphere.linkis.instance.label.vo.InsPersistenceLabelSearchVo;
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactory;
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
import com.webank.wedatasphere.linkis.manager.label.builder.factory.StdLabelBuilderFactory;
import com.webank.wedatasphere.linkis.manager.label.entity.Label;
import com.webank.wedatasphere.linkis.manager.label.entity.UserModifiable;
import com.webank.wedatasphere.linkis.manager.label.utils.LabelUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.commons.lang.math.NumberUtils.isNumber;

@AdapterMode
@Service
public class DefaultInsLabelService implements InsLabelAccessService {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultInsLabelService.class);
    private static Set<String> modifiableLabelKeyList;
    @Autowired
    private InstanceLabelDao instanceLabelDao;
    @Autowired
    private InstanceInfoDao instanceDao;
    @Autowired
    private InsLabelRelationDao insLabelRelationDao;
    private AsyncConsumerQueue<InsPersistenceLabel> asyncRemoveLabelQueue;
    private InsLabelAccessService selfService;
    private AtomicBoolean asyncQueueInit = new AtomicBoolean(false);
    private LabelBuilderFactory labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory();

    /**
     * init method
     */
    private synchronized void initQueue(){
        if(!asyncQueueInit.get()) {
            selfService = (InsLabelAccessService) AopContext.currentProxy();
            LOG.info("SelfService: [" + this.getClass().getName() + "]");
            asyncRemoveLabelQueue = new GenericAsyncConsumerQueue<>(InsLabelConf.ASYNC_QUEUE_CAPACITY.getValue());
            asyncRemoveLabelQueue.consumer(InsLabelConf.ASYNC_QUEUE_CONSUME_BATCH_SIZE.getValue(),
                    InsLabelConf.ASYNC_QUEUE_CONSUME_INTERVAL.getValue(), TimeUnit.SECONDS, insLabels -> {
                        selfService.removeLabelsIfNotRelation(insLabels);
                    });
            asyncQueueInit.set(true);
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void attachLabelToInstance(Label<?> label, ServiceInstance serviceInstance) {
        attachLabelsToInstance(Collections.singletonList(label), serviceInstance);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void attachLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) {
        List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
        List<InsPersistenceLabel> labelsNeedInsert = filterLabelNeededInsert(insLabels, true);
        if(!labelsNeedInsert.isEmpty()) {
            LOG.info("Persist labels: [" + LabelUtils.Jackson.toJson(labels, null) + "]");
            doInsertInsLabels(labelsNeedInsert);
        }
        LOG.info("Insert/Update service instance info: [" + serviceInstance + "]");
        doInsertInstance(serviceInstance);
        List<Integer> insLabelIds = insLabels.stream().map(InsPersistenceLabel :: getId).collect(Collectors.toList());
        LOG.trace("Build relation between labels: " +
                LabelUtils.Jackson.toJson(insLabelIds, null)
        + " and instance: [" + serviceInstance.getInstance() + "]");
        batchOperation(insLabelIds, subInsLabelIds -> insLabelRelationDao.insertRelations(serviceInstance.getInstance(), subInsLabelIds), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void refreshLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) {
        List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
        //Label candidate to be removed
        List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance());
        if(!labelsCandidateRemoved.isEmpty()){
            labelsCandidateRemoved.removeAll(insLabels);
        }
        LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]");
        insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance());
        //Attach labels to instance
        attachLabelsToInstance(insLabels, serviceInstance);
        // Async to delete labels that have no relationship
   /*     if(!labelsCandidateRemoved.isEmpty()){
            if(!asyncQueueInit.get()) {
                initQueue();
            }
            labelsCandidateRemoved.forEach( label -> {
                if(!asyncRemoveLabelQueue.offer(label)){
                    LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]");
                }
            });
        }*/
    }

    @Override
    @Transactional(rollbackFor=Exception.class)
    public void removeLabelsFromInstance(ServiceInstance serviceInstance) {
        //Label candidate to be removed
        List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance());
        LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]");
        insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance());
        // Async to delete labels that have no relationship
        if(!labelsCandidateRemoved.isEmpty()){
            labelsCandidateRemoved.forEach( label -> {
                if(!asyncQueueInit.get()) {
                    initQueue();
                }
                if(!asyncRemoveLabelQueue.offer(label)){
                    LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]");
                }
            });
        }
    }


    @Override
    public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels) {
        return searchInstancesByLabels(labels, Label.ValueRelation.ALL);
    }

    @Override
    public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels, Label.ValueRelation relation) {
        List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
        if (!insLabels.isEmpty()) {
            List<Map<String, String>> valueContent = new ArrayList<>();
            AtomicBoolean searchByValues = new AtomicBoolean(false);
            insLabels.forEach(insLabel -> {
                //It means that the labels provided is not regular,
                //so we should search instances by key-value map of labels
                if (StringUtils.isBlank(insLabel.getStringValue())) {
                    searchByValues.set(true);
                }
                valueContent.add(insLabel.getValue());
            });
            List<InstanceInfo> instanceInfoList = new ArrayList<>();
            if ((relation != Label.ValueRelation.ALL || searchByValues.get()) && valueContent.size() > 0) {
                instanceInfoList = insLabelRelationDao.searchInsDirectByValues(valueContent, relation.name());
            } else if (relation == Label.ValueRelation.ALL && !searchByValues.get()) {
                instanceInfoList = insLabelRelationDao.searchInsDirectByLabels(insLabels);
            }
            return instanceInfoList.stream().map(instanceInfo -> (ServiceInstance)instanceInfo).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    @Override
    public List<ServiceInstance> searchUnRelateInstances(ServiceInstance serviceInstance) {
        if(null != serviceInstance) {
            return insLabelRelationDao.searchUnRelateInstances(new InstanceInfo(serviceInstance))
                    .stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    @Override
    public List<ServiceInstance> searchLabelRelatedInstances(ServiceInstance serviceInstance) {
        if(null != serviceInstance){
            return insLabelRelationDao.searchLabelRelatedInstances(new InstanceInfo(serviceInstance))
                    .stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void removeLabelsIfNotRelation(List<? extends Label<?>> labels) {
        List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
        insLabels.forEach(insLabel -> {
            if(Optional.ofNullable(insLabel.getId()).isPresent()){
                insLabel = instanceLabelDao.selectForUpdate(insLabel.getId());
            }else{
                insLabel = instanceLabelDao.searchForUpdate(insLabel.getLabelKey(), insLabel.getStringValue());
            }
            if(null != insLabel) {
                Integer exist = insLabelRelationDao.existRelations(insLabel.getId());
                //Not exist
                if(null == exist){
                    LOG.info("Remove information of instance label: [" + insLabel.toString() + "]");
                    instanceLabelDao.remove(insLabel);
                    instanceLabelDao.doRemoveKeyValues(insLabel.getId());
                }
            }
        });
    }

    @Override
    public List<InstanceInfo> listAllInstanceWithLabel() {
        List<InstanceInfo> instances = insLabelRelationDao.listAllInstanceWithLabel();
        return instances;
    }

    public String getEurekaURL() throws Exception {
        String eurekaURL = InsLabelConf.EUREKA_IPADDRESS.getValue();
        String realURL = eurekaURL;
        if(eurekaURL.isEmpty()) {
            String defaultZone = InsLabelConf.EUREKA_URL.getValue();
            String tmpURL = urlPreDeal(defaultZone);
            realURL = replaceEurekaURL(tmpURL);
        }
        return realURL;
    }

    private String urlPreDeal(String defaultZone) {
        if(defaultZone.contains(",")){
            defaultZone = defaultZone.split(",")[0];
        }
        String url = defaultZone;
        if (defaultZone.matches("https?://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]")) {
            if (defaultZone.toLowerCase().contains("eureka")) {
                url = defaultZone.split("eureka", 2)[0];
            }
        }
        return url;
    }

    private String replaceEurekaURL(String tmpURL) throws URISyntaxException, UnknownHostException {
        URI uri = new URI(tmpURL);
        if(isIPAddress(uri.getHost())){
            if(uri.getHost().equals("127.0.0.1")){
                String ip = Utils.getLocalHostname();
                return tmpURL.replace("127.0.0.1", ip);
            }else{
                return tmpURL;
            }
        }else{
            String hostname = uri.getHost();
            InetAddress address = InetAddress.getByName(hostname);
            String realAddress = address.getHostAddress();
            return tmpURL.replace(hostname, realAddress);
        }
    }

    private boolean isIPAddress(String tmpURL) {
        String[] urlArray = tmpURL.split(".");
        if(urlArray.length == 4) {
            if (isNumber(urlArray[0]) && isNumber(urlArray[1]) && isNumber(urlArray[2]) && isNumber(urlArray[3])) {
                return true;
            }
        }
        return false;
    }

    /**
     * Filter labels
     * @param insLabels labels
     * @return
     */
    private List<InsPersistenceLabel> filterLabelNeededInsert(List<InsPersistenceLabel> insLabels, boolean needLock){
        List<InsPersistenceLabelSearchVo> labelSearchVos = insLabels.stream().map(InsPersistenceLabelSearchVo :: new).collect(Collectors.toList());
        List<InsPersistenceLabel> storedLabels = new ArrayList<>();
        if(!labelSearchVos.isEmpty()){
            storedLabels = instanceLabelDao.search(labelSearchVos);
        }
        if(!storedLabels.isEmpty()){
            List<InsPersistenceLabel> labelsNeedInsert = new ArrayList<>(insLabels);
            List<InsPersistenceLabel> finalStoredLabels = storedLabels;
            labelsNeedInsert.removeIf(labelNeedInsert -> {
                for(InsPersistenceLabel storedLabel : finalStoredLabels){
                    if(labelNeedInsert.equals(storedLabel)){
                        Integer labelId = storedLabel.getId();
                        labelNeedInsert.setId(labelId);
                        if(needLock) {
                            //Update to lock the record
                            return instanceLabelDao.updateForLock(labelId) >= 0;
                        }
                        return true;
                    }
                }
                return false;
            });
            return labelsNeedInsert;
        }
        return insLabels;
    }

    /**
     * Operation of inserting labels
     * @param insLabels labels
     */
    private void doInsertInsLabels(List<InsPersistenceLabel> insLabels){
        //Try to insert, use ON DUPLICATE KEY on mysql
        batchOperation(insLabels, subInsLabels -> instanceLabelDao.insertBatch(subInsLabels), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
        List<InsPersistenceLabelValue> labelValues = insLabels.stream().flatMap(insLabel -> {
            if(!Optional.ofNullable(insLabel.getId()).isPresent()){
                throw new IllegalArgumentException("Cannot get the generated id from bulk insertion, please check your mybatis version");
            }
            return insLabel.getValue().entrySet().stream().map( entry ->
                    new InsPersistenceLabelValue(insLabel.getId(), entry.getKey(), entry.getValue()));
        }).collect(Collectors.toList());
        batchOperation(labelValues, subLabelValues -> instanceLabelDao.doInsertKeyValues(subLabelValues), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
    }

    /**
     * Operation of inserting instances
     * @param serviceInstance service instance
     */
    private void doInsertInstance(ServiceInstance serviceInstance){
        //ON DUPLICATE KEY
        try{
            instanceDao.insertOne(new InstanceInfo(serviceInstance));
        }catch (Exception e){

        }
    }

    /**
     * Transform to <em>InsPersistenceLabel</em>
     * @param labels
     * @return
     */
    private List<InsPersistenceLabel> toInsPersistenceLabels(List<? extends Label<?>> labels){
        LabelBuilderFactory builderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory();
        return labels.stream().map(label -> {
            if(label instanceof InsPersistenceLabel){
                return (InsPersistenceLabel)label;
            }
            InsPersistenceLabel insLabel = builderFactory.convertLabel(label, InsPersistenceLabel.class);
            insLabel.setStringValue(label.getStringValue());
            if(StringUtils.isNotBlank(insLabel.getStringValue())){
                insLabel.setLabelValueSize(insLabel.getValue().size());
            }
            return insLabel;
        }).collect(Collectors.toList());
    }

    @SuppressWarnings("unchecked")
    private <T extends List<?>>void batchOperation(T input, Consumer<T> batchFunc, int batchSize){
        int listLen = input.size();
        if(listLen > 0) {
            for (int from = 0; from < listLen; ) {
                int to = Math.min(from + batchSize, listLen);
                T subInput = (T) input.subList(from, to);
                batchFunc.accept(subInput);
                from = to;
            }
        }
    }

    public void markInstanceLabel(List<InstanceInfo> instances) {
        Set<String> keyList = LabelUtils.listAllUserModifiableLabel();
        for(InstanceInfo instance: instances){
            List<InsPersistenceLabel> labels = instance.getLabels();
            if(!CollectionUtils.isEmpty(labels)){
                for(InsPersistenceLabel label: labels){
                    if(keyList.contains(label.getLabelKey())){
                        label.setModifiable(true);
                    }
                }
            }
        }
    }


}
